-
Notifications
You must be signed in to change notification settings - Fork 245
feat: DA hints in p2p #2891
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: DA hints in p2p #2891
Conversation
|
Claude finished @alpe's task in 2m 51s —— View job Comprehensive Code Review: DA Hints in P2PI've completed a thorough review of PR #2891, which adds DA height hints to the p2p layer. Overall, this is a well-designed feature with good test coverage. Below are my findings organized by category. Executive SummaryStrengths:
Critical Issues:
1. Architecture & Design ⭐⭐⭐⭐½StrengthsThe P2PEnvelope generic wrapper pattern is excellent: type P2PEnvelope[H header.Header[H]] struct {
Message H
DAHeightHint uint64
}This design:
Concerns1. Binary Compatibility Testing Gaps (types/p2p_envelope_test.go:92-117) The tests verify one-way compatibility (old format → new type), but missing:
Recommendation: Add integration tests simulating mixed-version networks and document the rollout strategy clearly. 2. Interface Segregation (block/internal/common/expected_interfaces.go:18-22) The Broadcaster interface combines three concerns: type Broadcaster[H header.Header[H]] interface {
WriteToStoreAndBroadcast(ctx context.Context, payload H, opts ...pubsub.PubOpt) error
AppendDAHint(ctx context.Context, daHeight uint64, hashes ...types.Hash) error
GetByHeight(ctx context.Context, height uint64) (H, uint64, error)
}Recommendation: Consider splitting into 2. Error Handling & Resilience ⭐⭐⭐⭐Strengths
Issues1. Silent Error Ignoring (block/internal/submitting/da_submitter.go:213-216) if err := s.headerDAHintAppender.AppendDAHint(ctx, res.Height, hashes...); err != nil {
s.logger.Error().Err(err).Msg("failed to append da height hint in header p2p store")
// ignoring error here, since we don't want to block the block submission'
}Issue: This creates an inconsistency window where DA submission succeeds but hints aren't stored. Syncing nodes may miss optimization opportunities. Recommendation:
2. Async DA Retrieval Error Handling (block/internal/syncing/async_da_retriever.go:98-102) events, err := r.retriever.RetrieveFromDA(r.ctx, height)
if err != nil {
r.logger.Debug().Err(err).Uint64("height", height).Msg("async DA retrieval failed")
return
}Issue: Transient DA failures are dropped permanently. No retry mechanism. Recommendation: Implement bounded retry with exponential backoff or re-queue failed heights for later retry. 3. Concurrency & Thread Safety ⭐⭐⭐⭐Strengths
Critical IssueRace Condition in SetProcessedHeight (block/internal/syncing/p2p_handler.go:62-72) func (h *P2PHandler) SetProcessedHeight(height uint64) {
for range 1_000 {
current := h.processedHeight.Load()
if height <= current {
return
}
if h.processedHeight.CompareAndSwap(current, height) {
return
}
}
}Issues:
Recommendation: func (h *P2PHandler) SetProcessedHeight(height uint64) {
const maxRetries = 100
for i := 0; i < maxRetries; i++ {
current := h.processedHeight.Load()
if height <= current {
return
}
if h.processedHeight.CompareAndSwap(current, height) {
return
}
if i > 10 && i%10 == 0 {
h.logger.Warn().Int("retries", i).Uint64("height", height).Msg("high CAS contention")
}
}
h.logger.Error().Uint64("height", height).Msg("failed to update processed height after max retries")
}2. Async DA Retriever Shutdown (block/internal/syncing/async_da_retriever.go:50-57) ✅ Clean shutdown pattern - workers respect context cancellation and properly signal completion via WaitGroup. 4. Security ⭐⭐⭐⭐⭐Excellent Security Practices1. Proposer Validation (block/internal/syncing/p2p_handler.go:132-137) func (h *P2PHandler) assertExpectedProposer(proposerAddr []byte) error {
if !bytes.Equal(h.genesis.ProposerAddress, proposerAddr) {
return fmt.Errorf("proposer address mismatch: got %x, expected %x",
proposerAddr, h.genesis.ProposerAddress)
}
return nil
}✅ Prevents accepting blocks from unauthorized proposers. 2. Data Hash Validation (block/internal/syncing/p2p_handler.go:104-108) if !bytes.Equal(header.DataHash[:], dataCommitment[:]) {
err := fmt.Errorf("data hash mismatch: header %x, data %x", header.DataHash, dataCommitment)
h.logger.Warn().Uint64("height", height).Err(err).Msg("discarding inconsistent block from P2P")
return err
}✅ Cryptographic validation prevents inconsistent data/header pairs. 3. DA Height Hint Tampering Issue: DA hints are not cryptographically authenticated. Malicious peers can provide incorrect hints. Impact: Low - worst case is inefficient DA fetching (falls back to canonical source). But could enable DoS via resource exhaustion. Recommendation:
5. Performance ⭐⭐⭐⭐Strengths1. Worker Pool Pattern (block/internal/syncing/async_da_retriever.go:40-48) func (r *AsyncDARetriever) Start(ctx context.Context) {
r.ctx, r.cancel = context.WithCancel(ctx)
for i := 0; i < 5; i++ {
r.wg.Add(1)
go r.worker()
}
}✅ Bounded concurrency prevents resource exhaustion. Buffer size of 100 is reasonable. 2. In-Flight Deduplication (block/internal/syncing/async_da_retriever.go:59-76) func (r *AsyncDARetriever) RequestRetrieval(height uint64) {
r.mu.Lock()
defer r.mu.Unlock()
if _, exists := r.inFlight[height]; exists {
return
}
// ...
}✅ Prevents duplicate DA fetches for same height. Concerns1. Hot Lock Path (block/internal/syncing/async_da_retriever.go:62) Every RequestRetrieval call acquires a mutex. Under high block rate, this could become a bottleneck. Recommendation: Use sync.Map or sharded locks if profiling shows contention. 2. No Back-Pressure Handling (block/internal/syncing/async_da_retriever.go:73-75) default:
r.logger.Debug().Uint64("height", height).Msg("DA retrieval worker pool full, dropping request")
}Issue: Silent drop on full queue. Under sustained load, DA retrieval could fall behind indefinitely. Recommendation: Add metrics for dropped requests and consider prioritizing recent heights. 6. Testing ⭐⭐⭐⭐Strengths1. Comprehensive Async DA Retriever Tests (block/internal/syncing/async_da_retriever_test.go) ✅ Tests cover:
2. Binary Compatibility Tests (types/p2p_envelope_test.go:92-153) ✅ Verifies old wire format can be read by new code. Coverage Gaps (from Codecov)1. types/p2p_envelope.go: 53.42% coverage - Missing 29 lines Focus on error paths in MarshalBinary/UnmarshalBinary. 2. block/internal/syncing/syncer.go: 58.62% coverage - Missing 12 lines Likely the DA hint triggered retrieval logic (lines 608-618). Recommendation: Add integration test that:
7. Documentation ⭐⭐⭐Good
Missing1. Migration Guide There is NO documentation on how to upgrade existing networks. Required information:
2. ADR Missing A breaking p2p change should have an Architecture Decision Record explaining:
8. Breaking Change AssessmentNetwork ImpactProto Changes:
Binary Compatibility Claim: ✅ Forward compatible: Old nodes can deserialize new messages (ignores optional field) Store ImpactThe p2p store format changes from Question: What happens to existing store entries? Are they readable? Looking at the proto, the wire format is compatible (same field numbers 1-3), so existing data should be readable with hint=0. ✅ Store appears forward-compatible 9. Recommendations by PriorityP0 - Must Fix Before Merge
P1 - Should Fix Soon
P2 - Nice to Have
10. Specific Code IssuesIssue 1: Infinite Deferred Cleanup RiskLocation: block/internal/syncing/async_da_retriever.go:92-96 func (r *AsyncDARetriever) processRetrieval(height uint64) {
defer func() {
r.mu.Lock()
delete(r.inFlight, height)
r.mu.Unlock()
}()Issue: If retrieval panics before defer runs, height stays in inFlight forever. Fix: Already handled correctly by defer at top of function. ✅ No issue. Issue 2: Potential Nil PointerLocation: pkg/sync/sync_service.go:184-198 func (s *SyncService[V]) AppendDAHint(ctx context.Context, daHeight uint64, hashes ...types.Hash) error {
entries := make([]V, 0, len(hashes))
for _, h := range hashes {
v, err := s.store.Get(ctx, h)
if err != nil {
if errors.Is(err, header.ErrNotFound) {
continue
}
return err
}
v.SetDAHint(daHeight)
entries = append(entries, v)
}
return s.store.Append(ctx, entries...)
}Analysis: Could ss, err := goheaderstore.NewStore[V](...)If NewStore returns nil with error, constructor returns error. ✅ No issue. Issue 3: DAHeightHints Array BoundsLocation: block/internal/syncing/syncer.go:597-606 if _, exists := s.cache.GetDataDAIncluded(event.Data.Hash().String()); !exists {
daHeightHints = append(daHeightHints, event.DaHeightHints[1])
}Issue: Direct index access Fix: Use named indices or constants: const (
HeaderDAHintIdx = 0
DataDAHintIdx = 1
)11. Positive HighlightsExcellent Design Patterns
Code Quality
SummaryThis is a well-executed feature with a clever approach to binary compatibility. The main gaps are:
The code is production-ready after addressing P0 items. The binary compatibility approach is sound, but requires real-world validation before merging. Recommendation: Approve with required changes to P0 items. CI Status✅ All checks passing or in progress. No test failures detected. |
|
The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2891 +/- ##
==========================================
+ Coverage 59.30% 59.42% +0.12%
==========================================
Files 105 107 +2
Lines 9998 10184 +186
==========================================
+ Hits 5929 6052 +123
- Misses 3442 3496 +54
- Partials 627 636 +9
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| cache.SetHeaderDAIncluded(headerHash.String(), res.Height, header.Height()) | ||
| hashes[i] = headerHash | ||
| } | ||
| if err := s.headerDAHintAppender.AppendDAHint(ctx, res.Height, hashes...); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where the DA height is passed to the sync service to update the p2p store
| Msg("P2P event with DA height hint, triggering targeted DA retrieval") | ||
|
|
||
| // Trigger targeted DA retrieval in background via worker pool | ||
| s.asyncDARetriever.RequestRetrieval(daHeightHint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where the "fetch from DA" is triggered for the current block event height
pkg/sync/da_hint_container.go
Outdated
| type SignedHeaderWithDAHint = DAHeightHintContainer[*types.SignedHeader] | ||
| type DataWithDAHint = DAHeightHintContainer[*types.Data] | ||
|
|
||
| type DAHeightHintContainer[H header.Header[H]] struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a data container to persist the DA hint together with the block header or data.
types.SignedHeader and types.Data are used all over the place so I did not modify them but added introduced this type for the p2p store and transfer only.
It may make sense to do make this a Proto type. WDYT?
| return nil | ||
| } | ||
|
|
||
| func (s *SyncService[V]) AppendDAHint(ctx context.Context, daHeight uint64, hashes ...types.Hash) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stores the DA height hints
|
if da hint is not in the proto how do other nodes get knowledge of the hint? also how would an existing network handle using this feature? its breaking so is it safe to upgrade? |
block/internal/syncing/syncer.go
Outdated
| "github.com/evstack/ev-node/block/internal/cache" | ||
| "github.com/evstack/ev-node/block/internal/common" | ||
| "github.com/evstack/ev-node/block/internal/da" | ||
| coreda "github.com/evstack/ev-node/core/da" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: gci linter
julienrbrt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! It really makes sense.
I share the same concern as @tac0turtle however about the upgrade strategy given it is p2p breaking.
The sync_service wraps the header/data payload in a
It is a breaking change. Instead of signed header or data types, the p2p network exchanges DAHeightHintContainer. This would be incompatible. Also the existing p2p stores would need migration to work. |
|
Could we broadcast both until every networks are updated? Then for final we can basically discard the previous one. |
|
fyi: This PR is missing a migration strategy for the p2p store ( and ideally network) |
* main: refactor(sequencers): persist prepended batch (#2907) feat(evm): add force inclusion command (#2888) feat: DA client, remove interface part 1: copy subset of types needed for the client using blob rpc. (#2905) feat: forced inclusion (#2797) fix: fix and cleanup metrics (sequencers + block) (#2904) build(deps): Bump mdast-util-to-hast from 13.2.0 to 13.2.1 in /docs in the npm_and_yarn group across 1 directory (#2900) refactor(block): centralize timeout in client (#2903) build(deps): Bump the all-go group across 2 directories with 3 updates (#2898) chore: bump default timeout (#2902) fix: revert default db (#2897) refactor: remove obsolete // +build tag (#2899) fix:da visualiser namespace (#2895)
* main: chore: execute goimports to format the code (#2924) refactor(block)!: remove GetLastState from components (#2923) feat(syncing): add grace period for missing force txs inclusion (#2915) chore: minor improvement for docs (#2918) feat: DA Client remove interface part 2, add client for celestia blob api (#2909) chore: update rust deps (#2917) feat(sequencers/based): add based batch time (#2911) build(deps): Bump golangci/golangci-lint-action from 9.1.0 to 9.2.0 (#2914) refactor(sequencers): implement batch position persistance (#2908)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview Temporary fix until #2891. After #2891 the verification for p2p blocks will be done in the background. ref: #2906 <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> -->
|
I have added 2 new types for the p2p store that are binary compatible to the types.Data and SignedHeader. With this, we should be able to roll this out without breaking the in-flight p2p data and store. |
julienrbrt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! I can see how useful the async retriever will be for force inclusion verification as well. We should have @auricom verify if p2p still works with Eden.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to be really useful for force inclusion checks as well.
* main: build(deps): Bump actions/cache from 4 to 5 (#2934) build(deps): Bump actions/download-artifact from 6 to 7 (#2933) build(deps): Bump actions/upload-artifact from 5 to 6 (#2932) feat: DA Client remove interface part 3, replace types with new code (#2910) DA Client remove interface: Part 2.5, create e2e test to validate that a blob is posted in DA layer. (#2920)
(cherry picked from commit ad3e21b)
Introduce envelope for headers on DA to fail fast on unauthorized content. Similar approach as in #2891 with a binary compatible sibling type that carries the additional information. * Add DAHeaderEnvelope type to wrap signed headers on DA * Binary compatible to `SignedHeader` proto type * Includes signature of of the plain content * DARetriever checks for valid signature early in the process * Supports `SignedHeader` for legacy support until first signed envelope read
* main: chore: fix some minor issues in the comments (#2955) feat: make reaper poll duration configurable (#2951) chore!: move sequencers to pkg (#2931) feat: Ensure Header integrity on DA (#2948) feat(testda): add header support with GetHeaderByHeight method (#2946) chore: improve code comments clarity (#2947) chore(sequencers): optimize store check (#2945)
|
ci seems to be having some issues, can these be fixed. Also was this tested on an existing network? If not, please do that before merging |
* main: fix: inconsistent state detection and rollback (#2983) chore: improve graceful shutdown restarts (#2985) feat(submitting): add posting strategies (#2973) chore: adding syncing tracing (#2981) feat(tracing): adding block production tracing (#2980) feat(tracing): Add Store, P2P and Config tracing (#2972) chore: fix upgrade test (#2979) build(deps): Bump github.com/ethereum/go-ethereum from 1.16.7 to 1.16.8 in /execution/evm/test in the go_modules group across 1 directory (#2974) feat(tracing): adding tracing to DA client (#2968) chore: create onboarding skill (#2971) test: add e2e tests for force inclusion (part 2) (#2970) feat(tracing): adding eth client tracing (#2960) test: add e2e tests for force inclusion (#2964) build(deps): Bump the all-go group across 4 directories with 10 updates (#2969) fix: Fail fast when executor ahead (#2966) feat(block): async epoch fetching (#2952) perf: tune badger defaults and add db bench (#2950) feat(tracing): add tracing to EngineClient (#2959) chore: inject W3C headers into engine client and eth client (#2958) feat: adding tracing for Executor and added initial configuration (#2957)
Overview
Resolves #2609
The basic idea is to store an additional DAHightHint field within the p2p store.
As SignedHeader and Data are used in other places, too. I added an an
DAHeightHintContainertype to wrap the tuple for the store only.The DAHeight Hint is added by the da_submitter and read in the syncer to fetch the missing DA header/data for the most recent block as required.
Please note: this is a breaking change to the p2p network and store